package com.tpvlog.dfs.datanode.server;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Processor线程
 *
 * @author Ressmix
 */
public class NioProcessor extends Thread {
    // Processor唯一标识
    private volatile Integer processorId;

    // 等待注册连接的队列
    private ConcurrentLinkedQueue<SocketChannel> channelQueue = new ConcurrentLinkedQueue<>();

    // 多路复用监听时的最大阻塞时间
    private static final Long POLL_BLOCK_MAX_TIME = 1000L;

    // 每个Processor私有的Selector多路复用器
    private Selector selector;

    // 缓存未读完的请求，Key为客户端IP
    private Map<String, NetworkRequest> cachedRequests = new HashMap<>();

    // 缓存未发送完的响应，Key为客户端IP
    private Map<String, NetworkResponse> cachedResponses = new HashMap<>();

    // 当前Processor维护的所有SelectionKey，Key为客户端IP
    private Map<String, SelectionKey> cachedKeys = new HashMap<>();

    public NioProcessor(Integer processorId) {
        super();
        this.processorId = processorId;
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Integer getProcessorId() {
        return this.processorId;
    }

    public void addChannel(SocketChannel channel) {
        channelQueue.offer(channel);
        // 唤醒Selector
        // 因为Processor自身线程可能在阻塞等待，所以当有新连接添加队列时，需要由server线程唤起它
        selector.wakeup();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1.不断对已经建立连接的Channel注册OP_READ
                registerQueuedClients();
                // 2.不断处理响应
                cacheQueuedResponse();
                // 3.以限时阻塞的方式感知连接中的请求
                poll();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /*----------------------------------------- PRIVATE METHOD --------------------------------------------*/
    private void registerQueuedClients() {
        SocketChannel channel = null;
        // 不断出队元素
        while ((channel = channelQueue.poll()) != null) {
            try {
                // 将已经建立连接的Channel注册到Selector上，并监听它的OP_READ事件
                channel.register(selector, SelectionKey.OP_READ);
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            }
        }
    }

    private void cacheQueuedResponse() {
        NetworkResponseQueues responseQueues = NetworkResponseQueues.getInstance();
        NetworkResponse response = null;
        // 遍历当前Processor自己的响应队列中的响应
        while ((response = responseQueues.poll(processorId)) != null) {
            String client = response.getClient();
            cachedResponses.put(client, response);
            // 关注OP_WRITE事件
            cachedKeys.get(client).interestOps(SelectionKey.OP_WRITE);
        }
    }

    private void poll() {
        try {
            // 这里Processor线程可能会阻塞等待
            int keys = selector.select(POLL_BLOCK_MAX_TIME);
            if (keys > 0) {
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while (keyIterator.hasNext()) {
                    try {
                        SelectionKey key = keyIterator.next();
                        keyIterator.remove();

                        SocketChannel channel = (SocketChannel) key.channel();
                        // 客户端IP地址
                        String client = channel.getRemoteAddress().toString();

                        // 1.发生读事件
                        if (key.isReadable()) {
                            NetworkRequest request = null;
                            if (cachedRequests.get(client) != null) {
                                // 缓存中有，说明上一次未读完，出现了拆包
                                request = cachedRequests.get(client);
                            } else {
                                request = new NetworkRequest();
                            }
                            // 执行读取操作
                            request.setChannel(channel);
                            request.setKey(key);
                            request.read();
                            // 1.1读取完成
                            if (request.hasCompletedRead()) {
                                // 将完整的请求分发到一个全局请求队列中，由IO线程处理
                                request.setClient(client);
                                NetworkRequestQueue.getInstance().offer(request);
                                cachedKeys.put(client, key);
                                // 删除缓存
                                cachedRequests.remove(client);
                                // 取消对OP_READ的关注
                                key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
                            }
                            // 1.2 没有读取完成，缓存等待下次继续读取
                            else {
                                cachedRequests.put(client, request);
                            }
                        }
                        // 2.发生写事件
                        else if (key.isWritable()) {
                            NetworkResponse response = cachedResponses.get(client);
                            // 发送响应
                            channel.write(response.getBuffer());
                            cachedResponses.remove(client);
                            cachedKeys.remove(client);
                            // 取消对OP_WRITE事件的关注
                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
